Skip to content

[WIP][ray] Ray merge into#8028

Draft
XiaoHongbo-Hope wants to merge 32 commits into
apache:masterfrom
XiaoHongbo-Hope:ray_merge_into
Draft

[WIP][ray] Ray merge into#8028
XiaoHongbo-Hope wants to merge 32 commits into
apache:masterfrom
XiaoHongbo-Hope:ray_merge_into

Conversation

@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

Purpose

Tests

Pythonic MERGE INTO on Ray Datasets, mirroring Spark/Flink merge-into.
UPSERT-flavored clauses (matched-update, not-matched-insert,
not-matched-by-source-update) supported; DELETE raises NotImplementedError
pending KeyValueDataWriter row-kind work.

API:
    from pypaimon.ray import merge_paimon
    merge_paimon(target, source, catalog_options,
                 on=[...],
                 when_matched_update={...},
                 when_not_matched_insert="*")

Algorithm: read target -> tag _side -> union -> groupby(on).map_groups
to classify matched/not-matched and apply SET; write back via write_paimon
(PK upsert through _SEQUENCE_NUMBER).

Known bugs to fix in follow-up:
- _schema_type_map referenced but never defined (NameError on call)
- for f in batch.schema iterates pa.Schema (TypeError on pyarrow >= 18)
- type-mismatch fallback to pa.null() destroys join keys
- test helper _make_pk_table_with_flag returns 1 value, test unpacks 2
- _schema_type_map called but undefined: NameError on any cross-schema merge.
- for f in batch.schema raises TypeError on pyarrow >= 18.
- type-mismatch fallback to pa.null() drops join key values.
- _make_pk_table_with_flag returned 1 value but caller unpacks 2.
…rop API

- pa.Table.drop deprecated in newer pyarrow; switch to drop_columns.
- matched branch silently produced cartesian product on multiple source rows.
- _required_target_cols_for_passthrough widened projection to all columns
  when its spec was None, defeating the projection optimization.
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [python][ray] Ray merge into [WIP][ray] Ray merge into May 28, 2026
XiaoHongbo-Hope and others added 11 commits May 29, 2026 00:16
Replace the driver-side matched-id set collection in the not-matched
INSERT path with a distributed left_anti join on the per-row id, matching
Spark's single LeftAnti predicate. Partition count is sized to the matched
row count to keep hash partitions dense, since ray's join fails on empty
partitions.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Reject multi-source cardinality by default; add allow_multiple_matches
  opt-in for deterministic keep-first.
- Refuse blob-column writes loudly instead of emitting wrong-format files.
- Check Dataset.join (ray>=2.50) at call time and restore the ray extra
  floor to 2.10, so read/sink users on older ray are unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@JingsongLi
Copy link
Copy Markdown
Contributor

API Design Recommendations for Paimon Ray merge_into from AI:

Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and Paimon Spark's MergeIntoPaimonDataEvolutionTable:

  ---
  1. Use string expressions for predicates, not Python callables

  Delta/Lance both use SQL-like strings that can be analyzed, optimized, and pushed down:

  # Delta
  dt.merge(source, predicate="target.id = source.id")
    .when_matched_update(updates={"name": "source.name"}, predicate="source.ts > target.ts")

  # Lance
  dataset.merge_insert("id").when_matched_update_all(condition="source.ts > target.ts")

  PR #8028 uses Python lambdas — these cannot be inspected for optimization, cannot participate in predicate pushdown, and are fragile for serialization across Ray workers:

  # PR #8028 (problematic)
  when_matched_update_condition=lambda r: r['s.age'] > r['t.age']

  Recommendation: Use string expressions or a simple expression DSL. If full SQL parsing is too heavy, at minimum support column-reference strings like "s.col" for SET values (which the PR already does) and simple comparison expressions for conditions.

  ---
  2. Align with Paimon Spark's execution model

  Paimon Spark's MergeIntoPaimonDataEvolutionTable uses:

  UPDATE path:  Target LEFT_OUTER JOIN Source → MergeRows → repartition(_FIRST_ROW_ID) → writePartialFields
  INSERT path:  Source LEFT_ANTI JOIN Target → MergeRows → write (full rows)

  The Ray implementation should mirror this:

  # UPDATE: inner join → extract (_ROW_ID, changed_cols) → partition by _FIRST_ROW_ID → partial write
  # INSERT: left_anti join → apply insert expressions → append write
  # COMMIT: atomic (update_msgs + insert_msgs), with snapshot conflict detection

  ---
  3. Recommended API shape

  from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched

  merge_into(
      target="db.table",
      source=ray_dataset,  # or pa.Table, pd.DataFrame, str (table identifier)
      catalog_options={...},
      on=["id"],  # or {"target_col": "source_col"} for renamed keys
      when_matched=[
          WhenMatched(update="*"),                          # update all cols from source
          WhenMatched(update={"name": "s.name"}, condition="s.ts > t.ts"),  # conditional
      ],
      when_not_matched=[
          WhenNotMatched(insert="*"),                       # insert all cols
          WhenNotMatched(insert={"id": "s.id", "status": "'new'"}, condition="s.age > 18"),
      ],
  )

  Key differences from PR #8028:
  - condition is a string expression, not a callable
  - Drop merge_condition (non-standard semantics that routes unmatched rows to INSERT — confusing and diverges from SQL MERGE)
  - Return a metrics dict: {"num_matched": ..., "num_inserted": ..., "num_unchanged": ...}

  ---
  4. Do not invent custom commit conflict detection

  PR #8028 adds commit.strict-mode.last-safe-snapshot to file_store_commit.py — a custom mechanism that modifies the core commit path for all table operations.

  Paimon already has conflict detection via writer.rowIdCheckConflict(snapshotId) (used in the Spark implementation). The Ray connector should reuse the same mechanism:

  # Capture snapshot before read
  plan = table.newSnapshotReader().read()
  # ... do merge work ...
  # Conflict check at commit time (existing Paimon mechanism)
  writer.rowIdCheckConflict(plan.snapshotId())
  writer.commit(update_msgs + insert_msgs)

  ---
  5. Follow Iceberg/Ray's two-phase separation

  Iceberg's Ray connector cleanly separates:
  - Phase 1 (distributed workers): write files, return metadata
  - Phase 2 (driver): collect metadata, resolve conflicts, atomic commit

  For Paimon:
  - Phase 1 (distributed):
    - UPDATE: inner_join → map_batches(apply_clauses) → groupby(_FIRST_ROW_ID).map_groups(partial_write) → return commit messages
    - INSERT: left_anti_join → map_batches(apply_clauses) → write_datasink → return commit messages
  - Phase 2 (driver): collect all commit messages → single atomic commit with conflict check

  ---
  6. Avoid driver-side materialization of large datasets

  PR #8028 collects target keys or matched indices into driver-side Python dicts/sets — this will OOM on large tables.

  Better alternatives (from Iceberg/Ray):
  - Use ray.put(keys_table) to broadcast key sets via object store (supports spill-to-disk)
  - Use Ray Data join() for distributed matching instead of collecting to driver
  - Use coarse range filters to prune unrelated target files before reading

  ---
  7. Minimum viable scope for a first PR

  Given the complexity, a reasonable first PR should:
  1. Support only when_matched_update("*") + when_not_matched_insert("*") (no conditions, no partial SET)
  2. Use Ray Data join() for both paths (inner + left_anti)
  3. Reuse existing TableUpdateByRowId for the update path
  4. Reuse existing write_paimon for the insert path
  5. Atomic commit with existing Paimon snapshot conflict detection
  6. No merge_condition, no self-merge optimization, no vectorized fast paths

  Then iterate with follow-up PRs for conditions, partial SET, optimizations, etc.

The update path grouped by _FIRST_ROW_ID with ray's default 200 hash
partitions, spawning hundreds of empty reduce tasks on small and medium
merges. Cap the groupby partitions at the distinct group count (one per
target data file), bounded by 200 so large tables keep today's behavior.

Verified on a 2-node ray cluster: an 18000-row merge drops the shuffle
from num_partitions=200 to 4 with no correctness change.
_assign_frid mapped each matched _ROW_ID to its file's first-row-id with
a per-row Python bisect over to_pylist(), a CPU hot spot when many rows
match. Replace it with a single numpy searchsorted over the matched
batch, keeping the null and out-of-range guards.

Verified: 26 ray merge_into unit tests pass; an 18000-row merge on a
2-node ray cluster stays correct.
When both when_matched and when_not_matched run on a non-empty target,
build the UPDATE and INSERT datasets from one materialized LEFT_OUTER
join instead of reading and shuffling the target table twice. The join
shuffle dominates cost at scale, so routing matched (non-null target)
and not-matched (null target) rows from a single join halves it.
Add two cases for the single-outer-join path: combined update+insert
under a merge_condition, and a matched clause-level condition with no
merge_condition (the full-target-read branch).
…pty-target insert

- '*' SET spec resolves a renamed ON key via the source column,
  preventing NULL writes into the key column.
- Inserting into an empty target skips all joins, avoiding ray's
  empty hash-partition crash.
- Replace the zip(range) src-index with a deterministic per-block
  running offset: no realignment shuffle, no extra full copy, and no
  shuffle-materialize barrier (which deadlocked ray at <=2 CPUs).
  Handles both pandas- and arrow-backed source blocks.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants